// Copyright 2018 The Cockroach Authors.
// Copyright (c) 2022-present, Shanghai Yunxi Technology Co, Ltd. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// This software (KWDB) is licensed under Mulan PSL v2.
// You can use this software according to the terms and conditions of the Mulan PSL v2.
// You may obtain a copy of Mulan PSL v2 at:
//          http://license.coscl.org.cn/MulanPSL2
// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
// See the Mulan PSL v2 for more details.

package xform

import (
	"math/rand"
	"reflect"

	"gitee.com/kwbasedb/kwbase/pkg/keys"
	"gitee.com/kwbasedb/kwbase/pkg/sql/opt"
	"gitee.com/kwbasedb/kwbase/pkg/sql/opt/cat"
	"gitee.com/kwbasedb/kwbase/pkg/sql/opt/memo"
	"gitee.com/kwbasedb/kwbase/pkg/sql/opt/norm"
	"gitee.com/kwbasedb/kwbase/pkg/sql/opt/ordering"
	"gitee.com/kwbasedb/kwbase/pkg/sql/opt/props/physical"
	"gitee.com/kwbasedb/kwbase/pkg/sql/pgwire/pgcode"
	"gitee.com/kwbasedb/kwbase/pkg/sql/pgwire/pgerror"
	"gitee.com/kwbasedb/kwbase/pkg/sql/sem/tree"
	"gitee.com/kwbasedb/kwbase/pkg/util"
	"gitee.com/kwbasedb/kwbase/pkg/util/errorutil"
	"gitee.com/kwbasedb/kwbase/pkg/util/log"
	"github.com/cockroachdb/errors"
)

// MatchedRuleFunc defines the callback function for the NotifyOnMatchedRule
// event supported by the optimizer. See the comment in factory.go for more
// details.
type MatchedRuleFunc = norm.MatchedRuleFunc

// AppliedRuleFunc defines the callback function for the NotifyOnAppliedRule
// event supported by the optimizer. See the comment in factory.go for more
// details.
type AppliedRuleFunc = norm.AppliedRuleFunc

// RuleSet efficiently stores an unordered set of RuleNames.
type RuleSet = util.FastIntSet

// Optimizer transforms an input expression tree into the logically equivalent
// output expression tree with the lowest possible execution cost.
//
// To use the optimizer, construct an input expression tree by invoking
// construction methods on the Optimizer.Factory instance. The factory
// transforms the input expression into its canonical form as part of
// construction. Pass the root of the tree constructed by the factory to the
// Optimize method, along with a set of required physical properties that the
// expression must provide. The optimizer will return an Expr over the output
// expression tree with the lowest cost.
type Optimizer struct {
	evalCtx *tree.EvalContext

	// f is the factory that creates the normalized expressions during the first
	// optimization phase.
	f norm.Factory

	// catalog is the Catalog object that's used to resolve SQL names.
	catalog cat.Catalog

	// mem is the Memo data structure containing the forest of query plans that
	// are generated by the optimizer.
	mem *memo.Memo

	// explorer generates alternate, equivalent expressions and stores them in
	// the memo.
	explorer explorer

	// defaultCoster implements the default cost model. If SetCoster is not
	// called, this coster will be used.
	defaultCoster coster

	// coster is set by default to reference defaultCoster, but can be overridden
	// by calling SetCoster.
	coster Coster

	// stateMap allocates temporary storage that's used to speed up optimization.
	// This state could be discarded once optimization is complete.
	stateMap   map[groupStateKey]*groupState
	stateAlloc groupStateAlloc

	// matchedRule is the callback function that is invoked each time an
	// optimization rule (Normalize or Explore) has been matched by the optimizer.
	// It can be set via a call to the NotifyOnMatchedRule method.
	matchedRule MatchedRuleFunc

	// appliedRule is the callback function which is invoked each time an
	// optimization rule (Normalize or Explore) has been applied by the optimizer.
	// It can be set via a call to the NotifyOnAppliedRule method.
	appliedRule AppliedRuleFunc

	// disabledRules is a set of rules that are not allowed to run, used for
	// testing.
	disabledRules RuleSet

	// JoinOrderBuilder adds new join orderings to the memo.
	jb *JoinOrderBuilder
}

// Init initializes the Optimizer with a new, blank memo structure inside. This
// must be called before the optimizer can be used (or reused).
func (o *Optimizer) Init(evalCtx *tree.EvalContext, catalog cat.Catalog) {
	o.evalCtx = evalCtx
	o.catalog = catalog
	o.f.Init(evalCtx, catalog)
	o.mem = o.f.Memo()
	o.explorer.init(o)
	o.defaultCoster.Init(evalCtx, o.mem, evalCtx.TestingKnobs.OptimizerCostPerturbation)
	o.coster = &o.defaultCoster
	o.stateMap = make(map[groupStateKey]*groupState)
	o.matchedRule = nil
	o.appliedRule = nil
	o.jb = &JoinOrderBuilder{}
	if evalCtx.TestingKnobs.DisableOptimizerRuleProbability > 0 {
		o.disableRules(evalCtx.TestingKnobs.DisableOptimizerRuleProbability)
	}
}

// DetachMemo extracts the memo from the optimizer, and then re-initializes the
// optimizer so that its reuse will not impact the detached memo. This method is
// used to extract a read-only memo during the PREPARE phase.
func (o *Optimizer) DetachMemo() *memo.Memo {
	detach := o.f.DetachMemo()
	o.Init(o.evalCtx, o.catalog)
	return detach
}

// Factory returns a factory interface that the caller uses to construct an
// input expression tree. The root of the resulting tree can be passed to the
// Optimize method in order to find the lowest cost plan.
func (o *Optimizer) Factory() *norm.Factory {
	return &o.f
}

// Coster returns the coster instance that the optimizer is currently using to
// estimate the cost of executing portions of the expression tree. When a new
// optimizer is constructed, it creates a default coster that will be used
// unless it is overridden with a call to SetCoster.
func (o *Optimizer) Coster() Coster {
	return o.coster
}

// SetCoster overrides the default coster. The optimizer will now use the given
// coster to estimate the cost of expression execution.
func (o *Optimizer) SetCoster(coster Coster) {
	o.coster = coster
}

// DisableOptimizations disables all transformation rules, including normalize
// and explore rules. The unaltered input expression tree becomes the output
// expression tree (because no transforms are applied).
func (o *Optimizer) DisableOptimizations() {
	o.NotifyOnMatchedRule(func(opt.RuleName) bool { return false })
}

// NotifyOnMatchedRule sets a callback function which is invoked each time an
// optimization rule (Normalize or Explore) has been matched by the optimizer.
// If matchedRule is nil, then no notifications are sent, and all rules are
// applied by default. In addition, callers can invoke the DisableOptimizations
// convenience method to disable all rules.
func (o *Optimizer) NotifyOnMatchedRule(matchedRule MatchedRuleFunc) {
	o.matchedRule = matchedRule

	// Also pass through the call to the factory so that normalization rules
	// make same callback.
	o.f.NotifyOnMatchedRule(matchedRule)
}

// NotifyOnAppliedRule sets a callback function which is invoked each time an
// optimization rule (Normalize or Explore) has been applied by the optimizer.
// If appliedRule is nil, then no further notifications are sent.
func (o *Optimizer) NotifyOnAppliedRule(appliedRule AppliedRuleFunc) {
	o.appliedRule = appliedRule

	// Also pass through the call to the factory so that normalization rules
	// make same callback.
	o.f.NotifyOnAppliedRule(appliedRule)
}

// Memo returns the memo structure that the optimizer is using to optimize.
func (o *Optimizer) Memo() *memo.Memo {
	return o.mem
}

// Optimize returns the expression which satisfies the required physical
// properties at the lowest possible execution cost, but is still logically
// equivalent to the given expression. If there is a cost "tie", then any one
// of the qualifying lowest cost expressions may be selected by the optimizer.
func (o *Optimizer) Optimize() (_ opt.Expr, err error) {
	defer func() {
		if r := recover(); r != nil {
			// This code allows us to propagate internal errors without having to add
			// error checks everywhere throughout the code. This is only possible
			// because the code does not update shared state and does not manipulate
			// locks.
			if ok, e := errorutil.ShouldCatch(r); ok {
				err = e
			} else {
				// Other panic objects can't be considered "safe" and thus are
				// propagated as crashes that terminate the session.
				panic(r)
			}
		}
	}()

	if o.mem.IsOptimized() {
		return nil, errors.AssertionFailedf("cannot optimize a memo multiple times")
	}

	// Optimize the root expression according to the properties required of it.
	o.optimizeRootWithProps()

	// Now optimize the entire expression tree.
	root := o.mem.RootExpr().(memo.RelExpr)
	rootProps := o.mem.RootProps()
	o.optimizeGroup(root, rootProps)

	// Walk the tree from the root, updating child pointers so that the memo
	// root points to the lowest cost tree by default (rather than the normalized
	// tree by default.
	root = o.setLowestCostTree(root, rootProps).(memo.RelExpr)
	if o.mem.TSSupportAllProcessor() {
		err = o.mem.CheckWhiteListAndAddSynchronize(&root)
		if err != nil {
			return root, err
		}
	} else {
		o.mem.DealTSScan(root.(memo.RelExpr))
	}
	o.mem.SetRoot(root, rootProps)

	// Validate there are no dangling references.
	if !root.Relational().OuterCols.Empty() {
		return nil, errors.AssertionFailedf(
			"top-level relational expression cannot have outer columns: %s",
			errors.Safe(root.Relational().OuterCols),
		)
	}

	return root, nil
}

// optimizeExpr calls either optimizeGroup or optimizeScalarExpr depending on
// the type of the expression (relational or scalar).
func (o *Optimizer) optimizeExpr(
	e opt.Expr, required *physical.Required,
) (cost memo.Cost, fullyOptimized bool) {
	switch t := e.(type) {
	case memo.RelExpr:
		state := o.optimizeGroup(t, required)
		return state.cost, state.fullyOptimized

	case memo.ScalarPropsExpr:
		// Short-circuit traversal of scalar expressions with no nested subquery,
		// since there's only one possible tree.
		if !t.ScalarProps().HasSubquery {
			return 0, true
		}
		return o.optimizeScalarExpr(t)

	case opt.ScalarExpr:
		return o.optimizeScalarExpr(t)

	default:
		panic(errors.AssertionFailedf("unhandled child: %+v", e))
	}
}

// optimizeGroup enumerates expression trees rooted in the given memo group and
// finds the expression tree with the lowest cost (i.e. the "best") that
// provides the given required physical properties. Enforcers are added as
// needed to provide the required properties.
//
// The following is a simplified walkthrough of how the optimizer might handle
// the following SQL query:
//
//	SELECT * FROM a WHERE x=1 ORDER BY y
//
// Before the optimizer is invoked, the memo group contains a single normalized
// expression:
//
//	memo
//	 ├── G1: (select G2 G3)
//	 ├── G2: (scan a)
//	 ├── G3: (eq 3 2)
//	 ├── G4: (variable x)
//	 └── G5: (const 1)
//
// Optimization begins at the root of the memo (group #1), and calls
// optimizeGroup with the properties required of that group ("ordering:y").
// optimizeGroup then calls optimizeGroupMember for the Select expression, which
// checks whether the expression can provide the required properties. Since
// Select is a pass-through operator, it can provide the properties by passing
// through the requirement to its input. Accordingly, optimizeGroupMember
// recursively invokes optimizeGroup on select's input child (group #2), with
// the same set of required properties.
//
// Now the same set of steps are applied to group #2. However, the Scan
// expression cannot provide the required ordering (say because it's ordered on
// x rather than y). The optimizer must add a Sort enforcer. It does this by
// recursively invoking optimizeGroup on the same group #2, but this time
// without the ordering requirement. The Scan operator is capable of meeting
// these reduced requirements, so it is costed and added as the current lowest
// cost expression for that group for that set of properties (i.e. the empty
// set).
//
//	memo
//	 ├── G1: (select G2 G3)
//	 ├── G2: (scan a)
//	 │    └── []
//	 │         ├── best: (scan a)
//	 │         └── cost: 100.00
//	 ├── G3: (eq 3 2)
//	 ├── G4: (variable x)
//	 └── G5: (const 1)
//
// The recursion pops up a level, and now the Sort enforcer knows its input,
// and so it too can be costed (cost of input + extra cost of sort) and added
// as the best expression for the property set with the ordering requirement.
//
//	memo
//	 ├── G1: (select G2 G3)
//	 ├── G2: (scan a)
//	 │    ├── [ordering: y]
//	 │    │    ├── best: (sort G2)
//	 │    │    └── cost: 150.00
//	 │    └── []
//	 │         ├── best: (scan a)
//	 │         └── cost: 100.00
//	 ├── G3: (eq 3 2)
//	 ├── G4: (variable x)
//	 └── G5: (const 1)
//
// Recursion pops up another level, and the Select operator now knows its input
// (the Sort of the Scan). It then moves on to its scalar filter child and
// optimizes it and its descendants, which is relatively uninteresting since
// there are no subqueries to consider (in fact, the optimizer recognizes this
// and skips traversal altogether). Once all children have been optimized, the
// Select operator can now be costed and added as the best expression for the
// ordering requirement. It requires the same ordering requirement from its
// input child (i.e. the scan).
//
//	memo
//	 ├── G1: (select G2 G3)
//	 │    └── [ordering: y]
//	 │         ├── best: (select G2="ordering: y" G3)
//	 │         └── cost: 160.00
//	 ├── G2: (scan a)
//	 │    ├── [ordering: y]
//	 │    │    ├── best: (sort G2)
//	 │    │    └── cost: 150.00
//	 │    └── []
//	 │         ├── best: (scan a)
//	 │         └── cost: 100.00
//	 ├── G3: (eq 3 2)
//	 ├── G4: (variable x)
//	 └── G5: (const 1)
//
// But the process is not yet complete. After traversing the Select child
// groups, optimizeExpr generates an alternate plan that satisfies the ordering
// property by using a top-level enforcer. It does this by recursively invoking
// optimizeGroup for group #1, but without the ordering requirement, analogous
// to what it did for group #2. This triggers optimization for each child group
// of the Select operator. But this time, the memo already has fully-costed
// best expressions available for both the Input and Filter children, and so
// returns them immediately with no extra work. The Select expression is now
// costed and added as the best expression without an ordering requirement.
//
//	memo
//	 ├── G1: (select G2 G3)
//	 │    ├── [ordering: y]
//	 │    │    ├── best: (select G2="ordering: y" G3)
//	 │    │    └── cost: 160.00
//	 │    └── []
//	 │         ├── best: (select G2 G3)
//	 │         └── cost: 110.00
//	 ├── G2: (scan a)
//	 │    ├── [ordering: y]
//	 │    │    ├── best: (sort G2)
//	 │    │    └── cost: 150.00
//	 │    └── []
//	 │         ├── best: (scan a)
//	 │         └── cost: 100.00
//	 ├── G3: (eq 3 2)
//	 ├── G4: (variable x)
//	 └── G5: (const 1)
//
// Finally, the Sort enforcer for group #1 has its input and can be costed. But
// rather than costing 50.0 like the other Sort enforcer, this one only costs
// 1.0, because it's sorting a tiny set of filtered rows. That means its total
// cost is only 111.0, which makes it the new best expression for group #1 with
// an ordering requirement:
//
//	memo
//	 ├── G1: (select G2 G3)
//	 │    ├── [ordering: y]
//	 │    │    ├── best: (sort G1)
//	 │    │    └── cost: 111.00
//	 │    └── []
//	 │         ├── best: (select G2 G3)
//	 │         └── cost: 110.00
//	 ├── G2: (scan a)
//	 │    ├── [ordering: y]
//	 │    │    ├── best: (sort G2)
//	 │    │    └── cost: 150.00
//	 │    └── []
//	 │         ├── best: (scan a)
//	 │         └── cost: 100.00
//	 ├── G3: (eq 3 2)
//	 ├── G4: (variable x)
//	 └── G5: (const 1)
//
// Now the memo has been fully optimized, and the best expression for group #1
// and "ordering:y" can be set as the root of the tree by setLowestCostTree:
//
//	sort
//	 ├── columns: x:1(int) y:2(int)
//	 ├── ordering: +2
//	 └── select
//	      ├── columns: x:1(int) y:2(int)
//	      ├── scan
//	      │    └── columns: x:1(int) y:2(int)
//	      └── eq [type=bool]
//	           ├── variable: a.x [type=int]
//	           └── const: 1 [type=int]
func (o *Optimizer) optimizeGroup(grp memo.RelExpr, required *physical.Required) *groupState {
	// Always start with the first expression in the group.
	grp = grp.FirstExpr()

	// If this group is already fully optimized, then return the already prepared
	// best expression (won't ever get better than this).
	state := o.ensureOptState(grp, required)
	if state.fullyOptimized {
		return state
	}

	// Iterate until the group has been fully optimized.
	for {
		fullyOptimized := true

		for i, member := 0, grp; member != nil; i, member = i+1, member.NextExpr() {
			// If this group member has already been fully optimized for the given
			// required properties, then skip it, since it won't get better.
			if state.isMemberFullyOptimized(i) {
				continue
			}

			// Optimize the group member with respect to the required properties.
			memberOptimized := o.optimizeGroupMember(state, member, required)

			// If any of the group members have not yet been fully optimized, then
			// the group is not yet fully optimized.
			if memberOptimized {
				state.markMemberAsFullyOptimized(i)
			} else {
				fullyOptimized = false
			}
		}

		// Now try to generate new expressions that are logically equivalent to
		// other expressions in this group.
		if o.shouldExplore(required) && !o.explorer.exploreGroup(grp).fullyExplored {
			fullyOptimized = false
		}

		if fullyOptimized {
			state.fullyOptimized = true
			break
		}
	}

	if o.evalCtx.HintStmtEmbed || o.evalCtx.HintID > 0 {
		o.checkHintApply(state)
	}

	return state
}

// optimizeGroupMember determines whether the group member expression can
// provide the required properties. If so, it recursively optimizes the
// expression's child groups and computes the cost of the expression. In
// addition, optimizeGroupMember calls enforceProps to check whether enforcers
// can provide the required properties at a lower cost. The lowest cost
// expression is saved to groupState.
func (o *Optimizer) optimizeGroupMember(
	state *groupState, member memo.RelExpr, required *physical.Required,
) (fullyOptimized bool) {
	// Compute the cost for enforcers to provide the required properties. This
	// may be lower than the expression providing the properties itself. For
	// example, it might be better to sort the results of a hash join than to
	// use the results of a merge join that are already sorted, but at the cost
	// of requiring one of the merge join children to be sorted.
	fullyOptimized = o.enforceProps(state, member, required)

	// If the expression cannot provide the required properties, then don't
	// continue. But what if the expression is able to provide a subset of the
	// properties? That case is taken care of by enforceProps, which will
	// recursively optimize the group with property subsets and then add
	// enforcers to provide the remainder.
	if CanProvidePhysicalProps(member, required) {
		var cost memo.Cost
		for i, n := 0, member.ChildCount(); i < n; i++ {
			// Given required parent properties, get the properties required from
			// the nth child.
			childRequired := BuildChildPhysicalProps(o.mem, member, i, required)

			// Optimize the child with respect to those properties.
			childCost, childOptimized := o.optimizeExpr(member.Child(i), childRequired)

			// Accumulate cost of children.
			cost += childCost

			// If any child expression is not fully optimized, then the parent
			// expression is also not fully optimized.
			if !childOptimized {
				fullyOptimized = false
			}
		}

		// Check whether this is the new lowest cost expression.
		cost += o.coster.ComputeCost(member, required)
		o.ratchetCost(state, member, cost)
	}

	// If it is the filter layer of the time series scenario,
	// we need to consider adjusting the order of filtering conditions
	// to speed up the execution of queries.
	if opt.CheckOptMode(opt.TSQueryOptMode.Get(&o.evalCtx.Settings.SV), opt.FilterOptOrder) &&
		o.mem.TSSupportAllProcessor() &&
		o.mem.CheckFlag(opt.IncludeTSTable) {
		if selectExpr, ok := member.(*memo.SelectExpr); ok {
			o.mem.SortFilters(selectExpr, member.Relational())
		}
	}
	return fullyOptimized
}

// optimizeScalarExpr recursively optimizes the children of a scalar expression.
// This is only necessary when the scalar expression contains a subquery, since
// scalar expressions otherwise always have zero cost and only one possible
// plan.
func (o *Optimizer) optimizeScalarExpr(
	scalar opt.ScalarExpr,
) (cost memo.Cost, fullyOptimized bool) {
	fullyOptimized = true
	for i, n := 0, scalar.ChildCount(); i < n; i++ {
		childProps := BuildChildPhysicalPropsScalar(o.mem, scalar, i)
		childCost, childOptimized := o.optimizeExpr(scalar.Child(i), childProps)

		// Accumulate cost of children.
		cost += childCost

		// If any child expression is not fully optimized, then the parent
		// expression is also not fully optimized.
		if !childOptimized {
			fullyOptimized = false
		}
	}
	return cost, fullyOptimized
}

// enforceProps costs an expression where one of the physical properties has
// been provided by an enforcer rather than by the expression itself. There are
// two reasons why this is necessary/desirable:
//
//  1. The expression may not be able to provide the property on its own. For
//     example, a hash join cannot provide ordered results.
//  2. The enforcer might be able to provide the property at lower overall
//     cost. For example, an enforced sort on top of a hash join might be
//     lower cost than a merge join that is already sorted, but at the cost of
//     requiring one of its children to be sorted.
//
// Note that enforceProps will recursively optimize this same group, but with
// one less required physical property. The recursive call will eventually make
// its way back here, at which point another physical property will be stripped
// off, and so on. Afterwards, the group will have computed a lowest cost
// expression for each sublist of physical properties, from all down to none.
//
// Right now, the only physical property that can be provided by an enforcer is
// physical.Required.Ordering. When adding another enforceable property, also
// update shouldExplore, which should return true if enforceProps will explore
// the group by recursively calling optimizeGroup (by way of optimizeEnforcer).
func (o *Optimizer) enforceProps(
	state *groupState, member memo.RelExpr, required *physical.Required,
) (fullyOptimized bool) {
	// Strip off one property that can be enforced. Other properties will be
	// stripped by recursively optimizing the group with successively fewer
	// properties. The properties are stripped off in a heuristic order, from
	// least likely to be expensive to enforce to most likely.
	if !required.Ordering.Any() {
		// Try Sort enforcer that requires no ordering from its input.
		enforcer := &memo.SortExpr{Input: member}
		memberProps := BuildChildPhysicalProps(o.mem, enforcer, 0, required)
		fullyOptimized = o.optimizeEnforcer(state, enforcer, required, member, memberProps)

		// Try Sort enforcer that requires a partial ordering from its input.
		longestCommonPrefix := deriveInterestingOrderingPrefix(member, required.Ordering)
		if len(longestCommonPrefix) > 0 && len(longestCommonPrefix) < len(required.Ordering.Columns) {
			enforcer := &memo.SortExpr{Input: state.best}
			enforcer.InputOrdering.FromOrdering(longestCommonPrefix)
			memberProps := BuildChildPhysicalProps(o.mem, enforcer, 0, required)
			if o.optimizeEnforcer(state, enforcer, required, member, memberProps) {
				fullyOptimized = true
			}
		}

		return fullyOptimized
	}

	return true
}

// deriveInterestingOrderingPrefix finds the longest prefix of the required ordering
// that is "interesting" as defined in Relational.Rule.InterestingOrderings.
func deriveInterestingOrderingPrefix(
	member memo.RelExpr, requiredOrdering physical.OrderingChoice,
) opt.Ordering {
	// Find the interesting orderings of the member expression.
	interestingOrderings := DeriveInterestingOrderings(member)

	// Find the longest interesting ordering that is a prefix of the required ordering.
	var longestCommonPrefix opt.Ordering
	for _, ordering := range interestingOrderings {
		var commonPrefix opt.Ordering
		for i, orderingCol := range ordering {
			if i < len(requiredOrdering.Columns) &&
				requiredOrdering.Columns[i].Group.Contains(orderingCol.ID()) &&
				requiredOrdering.Columns[i].Descending == orderingCol.Descending() {
				commonPrefix = append(commonPrefix, orderingCol)
			} else {
				break
			}
		}
		if len(commonPrefix) > len(longestCommonPrefix) {
			longestCommonPrefix = commonPrefix
		}
	}
	return longestCommonPrefix
}

// optimizeEnforcer optimizes and costs the enforcer.
func (o *Optimizer) optimizeEnforcer(
	state *groupState,
	enforcer memo.RelExpr,
	enforcerProps *physical.Required,
	member memo.RelExpr,
	memberProps *physical.Required,
) (fullyOptimized bool) {
	// Recursively optimize the member group with respect to a subset of the
	// enforcer properties.
	innerState := o.optimizeGroup(member, memberProps)
	fullyOptimized = innerState.fullyOptimized

	// Check whether this is the new lowest cost expression with the enforcer
	// added.
	cost := innerState.cost + o.coster.ComputeCost(enforcer, enforcerProps)
	if enforcerProps.MustAddSort {
		cost = 0
	}
	o.ratchetCost(state, enforcer, cost)

	// Enforcer expression is fully optimized if its input expression is fully
	// optimized.
	return fullyOptimized
}

// shouldExplore ensures that exploration is only triggered for optimizeGroup
// calls that will not recurse via a call from enforceProps.
func (o *Optimizer) shouldExplore(required *physical.Required) bool {
	return required.Ordering.Any()
}

// setLowestCostTree traverses the memo and recursively updates child pointers
// so that they point to the lowest cost expression tree rather than to the
// normalized expression tree. Each participating memo group is updated to store
// the physical properties required of it, as well as the estimated cost of
// executing the lowest cost expression in the group. As an example, say this is
// the memo, with a normalized tree containing the first expression in each of
// the groups:
//
//	memo
//	 ├── G1: (inner-join G2 G3 G4) (inner-join G3 G2 G4)
//	 ├── G2: (scan a)
//	 ├── G3: (select G5 G6) (scan b,constrained)
//	 ├── G4: (true)
//	 ├── G5: (scan b)
//	 ├── G6: (eq G7 G8)
//	 ├── G7: (variable b.x)
//	 └── G8: (const 1)
//
// setLowestCostTree is called after exploration is complete, and after each
// group member has been costed. If the second expression in groups G1 and G3
// have lower cost than the first expressions, then setLowestCostTree will
// update pointers so that the root expression tree includes those expressions
// instead.
//
// Note that a memo group can have multiple lowest cost expressions, each for a
// different set of physical properties. During optimization, these are retained
// in the groupState map. However, only one of those lowest cost expressions
// will be used in the final tree; the others are simply discarded. This is
// because there is never a case where a relational expression is referenced
// multiple times in the final tree, but with different physical properties
// required by each of those references.
func (o *Optimizer) setLowestCostTree(parent opt.Expr, parentProps *physical.Required) opt.Expr {
	var relParent memo.RelExpr
	var relCost memo.Cost
	switch t := parent.(type) {
	case memo.RelExpr:
		state := o.lookupOptState(t.FirstExpr(), parentProps)
		relParent, relCost = state.best, state.cost
		parent = relParent

	case memo.ScalarPropsExpr:
		// Short-circuit traversal of scalar expressions with no nested subquery,
		// since there's only one possible tree.
		if !t.ScalarProps().HasSubquery {
			return parent
		}
	}

	// Iterate over the expression's children, replacing any that have a lower
	// cost alternative.
	var mutable opt.MutableExpr
	var childProps *physical.Required
	for i, n := 0, parent.ChildCount(); i < n; i++ {
		before := parent.Child(i)

		if relParent != nil {
			childProps = BuildChildPhysicalProps(o.mem, relParent, i, parentProps)
		} else {
			childProps = BuildChildPhysicalPropsScalar(o.mem, parent, i)
		}

		after := o.setLowestCostTree(before, childProps)
		if after != before {
			if mutable == nil {
				mutable = parent.(opt.MutableExpr)
			}
			mutable.SetChild(i, after)
		}
	}

	if parent.Op() == opt.TSScanOp {
		// handle tsScan's children tagFilters and primaryTagFilters, set lowest cost
		tsScan, _ := parent.(*memo.TSScanExpr)
		for _, before := range tsScan.TagFilter {
			o.setLowestCostTree(before, childProps)
		}
		for _, before := range tsScan.PrimaryTagFilter {
			o.setLowestCostTree(before, childProps)
		}
	}

	if relParent != nil {
		var provided physical.Provided
		// BuildProvided relies on ProvidedPhysical() being set in the children, so
		// it must run after the recursive calls on the children.
		provided.Ordering = ordering.BuildProvided(relParent, &parentProps.Ordering)
		o.mem.SetBestProps(relParent, parentProps, &provided, relCost)
	}

	return parent
}

// ratchetCost computes the cost of the candidate expression, and then checks
// whether it's lower than the cost of the existing best expression in the
// group. If so, then the candidate becomes the new lowest cost expression.
func (o *Optimizer) ratchetCost(state *groupState, candidate memo.RelExpr, cost memo.Cost) {
	if state.best == nil || cost.Less(state.cost) {
		state.best = candidate
		state.cost = cost
	}
}

// lookupOptState looks up the state associated with the given group and
// properties. If no state exists yet, then lookupOptState returns nil.
func (o *Optimizer) lookupOptState(grp memo.RelExpr, required *physical.Required) *groupState {
	return o.stateMap[groupStateKey{group: grp, required: required}]
}

// ensureOptState looks up the state associated with the given group and
// properties. If none is associated yet, then ensureOptState allocates new
// state and returns it.
func (o *Optimizer) ensureOptState(grp memo.RelExpr, required *physical.Required) *groupState {
	key := groupStateKey{group: grp, required: required}
	state, ok := o.stateMap[key]
	if !ok {
		state = o.stateAlloc.allocate()
		state.required = required
		o.stateMap[key] = state
	}
	return state
}

// if expr contains tsInsertSelectExpr, deal with expr.input
func dealWithTsInsertSelect(expr memo.RelExpr) (memo.RelExpr, bool) {
	if e, ok := expr.(*memo.TSInsertSelectExpr); ok {
		return e.Input, true
	}
	// with ... as ...
	if e1, ok1 := expr.(*memo.WithExpr); ok1 {
		if e2, ok2 := e1.Main.(*memo.TSInsertSelectExpr); ok2 {
			return e2.Input, true
		}
	}
	return expr, false
}

// optimizeRootWithProps tries to simplify the root operator based on the
// properties required of it. This may trigger the creation of a new root and
// new properties.
func (o *Optimizer) optimizeRootWithProps() {
	root, ok := o.mem.RootExpr().(memo.RelExpr)
	if !ok {
		panic(errors.AssertionFailedf("Optimize can only be called on relational root expressions"))
	}

	// rootProps is bestExpr of subexpression in time series insert select,
	// handle TSInsertSelectExpr. execute some sql maybe stack error
	// so need to simplify subexpression
	// eg: insert into test_ts.ts_table2
	// select * from test_sts.stable as t1 where
	//	e1 in (select e1 from test_ts.ts_table where e1>1000 and t1.e2 < 2000 limit 1 offset 1);
	var isTsInsSel bool
	root, isTsInsSel = dealWithTsInsertSelect(root)
	rootProps := o.mem.RootProps()

	// [SimplifyRootOrdering]
	// SimplifyRootOrdering removes redundant columns from the root properties,
	// based on the operator's functional dependencies.
	// ps: simplifyRootOrdering is forbiddened if it is tsInsertSelect
	if rootProps.Ordering.CanSimplify(&root.Relational().FuncDeps) && !isTsInsSel {
		if o.matchedRule == nil || o.matchedRule(opt.SimplifyRootOrdering) {
			simplified := *rootProps
			simplified.Ordering = rootProps.Ordering.Copy()
			simplified.Ordering.Simplify(&root.Relational().FuncDeps)
			o.mem.SetRoot(root, &simplified)
			rootProps = o.mem.RootProps()
			if o.appliedRule != nil {
				o.appliedRule(opt.SimplifyRootOrdering, nil, root)
			}
		}
	}

	// [PruneRootCols]
	// PruneRootCols discards columns that are not needed by the root's ordering
	// or presentation properties.
	neededCols := rootProps.ColSet()
	if !neededCols.SubsetOf(root.Relational().OutputCols) {
		panic(errors.AssertionFailedf(
			"columns required of root %s must be subset of output columns %s",
			neededCols,
			root.Relational().OutputCols,
		))
	}
	if o.f.CustomFuncs().CanPruneCols(root, neededCols) {
		if o.matchedRule == nil || o.matchedRule(opt.PruneRootCols) {
			root = o.f.CustomFuncs().PruneCols(root, neededCols)
			// We may have pruned a column that appears in the required ordering.
			rootCols := root.Relational().OutputCols
			if !rootProps.Ordering.SubsetOfCols(rootCols) {
				newProps := *rootProps
				newProps.Ordering = rootProps.Ordering.Copy()
				newProps.Ordering.ProjectCols(rootCols)
				o.mem.SetRoot(root, &newProps)
				//lint:ignore SA4006 set rootProps in case another rule is added below.
				rootProps = o.mem.RootProps()
			} else {
				o.mem.SetRoot(root, rootProps)
			}
			if o.appliedRule != nil {
				o.appliedRule(opt.PruneRootCols, nil, root)
			}
		}
	}
}

// groupStateKey associates groupState with a group that is being optimized with
// respect to a set of physical properties.
type groupStateKey struct {
	group    memo.RelExpr
	required *physical.Required
}

// groupState is temporary storage that's associated with each group that's
// optimized (or same group with different sets of physical properties). The
// optimizer stores various flags and other state here that allows it to do
// quicker lookups and short-circuit already traversed parts of the expression
// tree.
type groupState struct {
	// best identifies the lowest cost expression in the memo group for a given
	// set of physical properties.
	best memo.RelExpr

	// required is the set of physical properties that must be provided by this
	// lowest cost expression. An expression that cannot provide these properties
	// cannot be the best expression, no matter how low its cost.
	required *physical.Required

	// cost is the estimated execution cost for this expression. The best
	// expression for a given group and set of physical properties is the
	// expression with the lowest cost.
	cost memo.Cost

	// fullyOptimized is set to true once the lowest cost expression has been
	// found for a memo group, with respect to the required properties. A lower
	// cost expression will never be found, no matter how many additional
	// optimization passes are made.
	fullyOptimized bool

	// fullyOptimizedExprs contains the set of ordinal positions of each member
	// expression in the group that has been fully optimized for the required
	// properties. These never need to be recosted, no matter how many additional
	// optimization passes are made.
	fullyOptimizedExprs util.FastIntSet

	// explore is used by the explorer to store intermediate state so that
	// redundant work is minimized.
	explore exploreState
}

// isMemberFullyOptimized returns true if the group member at the given ordinal
// position has been fully optimized for the required properties. The expression
// never needs to be recosted, no matter how many additional optimization passes
// are made.
func (os *groupState) isMemberFullyOptimized(ord int) bool {
	return os.fullyOptimizedExprs.Contains(ord)
}

// markMemberAsFullyOptimized marks the group member at the given ordinal
// position as fully optimized for the required properties. The expression never
// needs to be recosted, no matter how many additional optimization passes are
// made.
func (os *groupState) markMemberAsFullyOptimized(ord int) {
	if os.fullyOptimized {
		panic(errors.AssertionFailedf("best expression is already fully optimized"))
	}
	if os.isMemberFullyOptimized(ord) {
		panic(errors.AssertionFailedf("memo expression is already fully optimized for required physical properties"))
	}
	os.fullyOptimizedExprs.Add(ord)
}

// groupStateAlloc allocates pages of groupState structs. This is preferable to
// a slice of groupState structs because pointers are not invalidated when a
// resize occurs, and because there's no need to retain a stable index.
type groupStateAlloc struct {
	page []groupState
}

// allocate returns a pointer to a new, empty groupState struct. The pointer is
// stable, meaning that its location won't change as other groupState structs
// are allocated.
func (a *groupStateAlloc) allocate() *groupState {
	if len(a.page) == 0 {
		a.page = make([]groupState, 8)
	}
	state := &a.page[0]
	a.page = a.page[1:]
	return state
}

// disableRules disables rules with the given probability for testing.
func (o *Optimizer) disableRules(probability float64) {
	essentialRules := util.MakeFastIntSet(
		// Needed to prevent constraint building from failing.
		int(opt.NormalizeInConst),
		// Needed when an index is forced.
		int(opt.GenerateIndexScans),
		// Needed to prevent "same fingerprint cannot map to different groups."
		int(opt.PruneJoinLeftCols),
		int(opt.PruneJoinRightCols),
		// Needed to prevent stack overflow.
		int(opt.PushFilterIntoJoinLeftAndRight),
		int(opt.PruneSelectCols),
		// Needed to prevent execbuilder error.
		// TODO(radu): the DistinctOn execution path should be fixed up so it
		// supports distinct on an empty column set.
		int(opt.EliminateDistinctNoColumns),
		int(opt.EliminateErrorDistinctNoColumns),
	)

	for i := opt.RuleName(1); i < opt.NumRuleNames; i++ {
		if rand.Float64() < probability && !essentialRules.Contains(int(i)) {
			o.disabledRules.Add(int(i))
		}
	}

	o.NotifyOnMatchedRule(func(ruleName opt.RuleName) bool {
		if o.disabledRules.Contains(int(ruleName)) {
			log.Infof(o.evalCtx.Context, "disabled rule matched: %s", ruleName.String())
			return false
		}
		return true
	})
}

func (o *Optimizer) String() string {
	return o.FormatMemo(FmtPretty)
}

// FormatMemo returns a string representation of the memo for testing
// and debugging. The given flags control which properties are shown.
func (o *Optimizer) FormatMemo(flags FmtFlags) string {
	mf := makeMemoFormatter(o, flags)
	return mf.format()
}

// RecomputeCost recomputes the cost of each expression in the lowest cost
// tree. It should be used in combination with the perturb-cost OptTester flag
// in order to update the query plan tree after optimization is complete with
// the real computed cost, not the perturbed cost.
func (o *Optimizer) RecomputeCost() {
	var c coster
	c.Init(o.evalCtx, o.mem, 0 /* perturbation */)

	root := o.mem.RootExpr()
	rootProps := o.mem.RootProps()
	o.recomputeCostImpl(root, rootProps, &c)
}

func (o *Optimizer) recomputeCostImpl(
	parent opt.Expr, parentProps *physical.Required, c Coster,
) memo.Cost {
	cost := memo.Cost(0)
	for i, n := 0, parent.ChildCount(); i < n; i++ {
		child := parent.Child(i)
		childProps := physical.MinRequired
		switch t := child.(type) {
		case memo.RelExpr:
			childProps = t.RequiredPhysical()
		}
		cost += o.recomputeCostImpl(child, childProps, c)
	}

	switch t := parent.(type) {
	case memo.RelExpr:
		cost += c.ComputeCost(t, parentProps)
		o.mem.ResetCost(t, cost)
	}

	return cost
}

// FormatExpr is a convenience wrapper for memo.FormatExpr.
func (o *Optimizer) FormatExpr(e opt.Expr, flags memo.ExprFmtFlags) string {
	return memo.FormatExpr(e, flags, o.mem, o.catalog)
}

// InitTSRequired init time series required optimizer
func (o *Optimizer) InitTSRequired(parent opt.Expr, parentProps *physical.Required) {
	var relParent memo.RelExpr
	var relCost memo.Cost
	switch t := parent.(type) {
	case memo.RelExpr:
		relParent = t
		parent = relParent
	}
	for i, n := 0, parent.ChildCount(); i < n; i++ {
		before := parent.Child(i)

		childProps := &physical.Required{}
		o.InitTSRequired(before, childProps)
	}
	if relParent != nil {
		var provided physical.Provided
		o.mem.SetBestProps(relParent, parentProps, &provided, relCost)
	}
}

// checkHintApply checks whether the hint is applied(include external and embedded)
func (o *Optimizer) checkHintApply(state *groupState) {
	if !state.fullyOptimized {
		return
	}
	e := state.best
	switch t := e.(type) {
	case *memo.ScanExpr:
		if t.Flags.HintType == keys.NoScanHint {
			break
		}
		if t.Flags.HintType == keys.ForceTableScan && t.Index != cat.PrimaryIndex {
			panic(pgerror.New(pgcode.ExternalApplyTableScanForce, "stmt hint err: error use hint: "+t.Flags.HintType.String()))
		}
		if t.Flags.HintType == keys.UseTableScan && t.Index != cat.PrimaryIndex {
			panic(pgerror.New(pgcode.ExternalApplyTableScanUse, "stmt hint err: error use hint: "+t.Flags.HintType.String()))
		}
		if t.Flags.HintType == keys.IgnoreTableScan && t.Index == cat.PrimaryIndex {
			panic(pgerror.New(pgcode.ExternalApplyTableScanIgnore, "stmt hint err: error use hint: "+t.Flags.HintType.String()))
		}
		if t.Flags.HintType == keys.ForceIndexOnly && t.Flags.HintIndexes[0] != t.Index {
			panic(pgerror.New(pgcode.ExternalApplyIndexOnlyForce, "stmt hint err: error use hint: "+t.Flags.HintType.String()))
		}
		if t.Flags.HintType == keys.ForceIndexScan && t.Flags.HintIndexes[0] != t.Index {
			panic(pgerror.New(pgcode.ExternalApplyIndexScanForce, "stmt hint err: error use hint: "+t.Flags.HintType.String()))
		}
		if t.Flags.HintType == keys.UseIndexOnly {
			find := false
			for _, index := range t.Flags.HintIndexes {
				if t.Index == index {
					find = true
					break
				}
			}
			if !find {
				panic(pgerror.New(pgcode.ExternalApplyIndexOnlyUse, "stmt hint err: error use hint: "+t.Flags.HintType.String()))
			}
			break
		}
		if t.Flags.HintType == keys.UseIndexScan {
			find := false
			for _, index := range t.Flags.HintIndexes {
				if t.Index == index {
					find = true
					break
				}
			}
			if !find {
				panic(pgerror.New(pgcode.ExternalApplyIndexScanUse, "stmt hint err: error use hint: "+t.Flags.HintType.String()))
			}
			break
		}

		if t.Flags.HintType == keys.IgnoreIndexOnly {
			for _, index := range t.Flags.HintIndexes {
				if t.Index == index {
					panic(pgerror.New(pgcode.ExternalApplyIndexOnlyIgnore, "stmt hint err: error use hint: "+t.Flags.HintType.String()))
				}
			}
			break
		}
		if t.Flags.HintType == keys.IgnoreIndexScan {
			for _, index := range t.Flags.HintIndexes {
				if t.Index == index {
					panic(pgerror.New(pgcode.ExternalApplyIndexScanIgnore, "stmt hint err: error use hint: "+t.Flags.HintType.String()))
				}
			}
			break
		}
	case *memo.MergeJoinExpr:
		if t.HintInfo.DisallowMergeJoin {
			panic(pgerror.New(pgcode.ExternalApplyMergeJoinDisallow, "stmt hint err: DisallowMergeJoin is true, but MergeJoinExpr was generated."))
		}

	case *memo.LookupJoinExpr:
		if t.HintInfo.DisallowLookupJoin {
			panic(pgerror.New(pgcode.ExternalApplyLookupJoinDisallow, "stmt hint err: DisallowLookupJoin is true, but LookupJoinExpr was generated."))
		}
	case *memo.InnerJoinExpr:
		if t.HintInfo.DisallowHashJoin {
			panic(pgerror.New(pgcode.ExternalApplyHashJoinDisallow, "stmt hint err: DisallowHashJoin is true, but HashJoinExpr was generated."))
		}

	default:
		break
	}
}

// walk() return true if there have TSScanExpr.
// expr is a memo tree.
func walk(expr opt.Expr) bool {
	if expr.Op() == opt.TSScanOp {
		return true
	}

	// case: select * from table,device; when expr is scan should stop walk.
	if expr.ChildCount() < 1 {
		return false
	}
	e := expr.Child(0)
	return walk(e)
}

// IndexMapping defines the mapping for indexes associated with a specific table
// for multi-model processing.
type IndexMapping struct {
	TableIndex opt.TableID
	IndexID    []uint32
}

// CheckMultiModel serves as the entry point for checking all expressions
// in the memo to ensure they comply with the requirements
// for multi-model processing.
func (o *Optimizer) CheckMultiModel() {
	mem := o.mem
	root := mem.RootExpr().(memo.RelExpr)
	processRootExpr(root, mem)
	processRemainingRelTables(mem)
}

// isTSScanOrSelectTSScan is a helper function to check if the expression is a TSScanExpr or a SelectExpr with a TSScanExpr input
// for multiple model processing
func isTSScanOrSelectTSScan(expr memo.RelExpr) bool {
	switch e := expr.(type) {
	case *memo.Max1RowExpr:
		return isTSScanOrSelectTSScan(e.Input)
	case *memo.ProjectExpr:
		return isTSScanOrSelectTSScan(e.Input)
	case *memo.TSScanExpr:
		return true
	case *memo.SelectExpr:
		if _, ok := e.Input.(*memo.TSScanExpr); ok {
			return true
		}
	}
	return false
}

// processRootExpr examines the expressions within the memo structure to determine
// if the multi-model flag needs to be reset.
// for multiple model processing
func processRootExpr(root memo.RelExpr, mem *memo.Memo) {
	allTables := mem.Metadata().AllTables()
	switch expr := root.(type) {
	case *memo.ProjectExpr:
		processRootExpr(expr.Input, mem)
		for _, proj := range expr.Projections {
			if castExpr, ok := proj.Element.(*memo.CastExpr); ok {
				if variableExpr, ok := castExpr.Input.(*memo.VariableExpr); ok {
					colID := variableExpr.Col
					tableID := mem.Metadata().ColumnMeta(colID).Table
					tableGroupID := mem.MultimodelHelper.GetTSTableIndexFromGroup(tableID)
					if tableGroupID >= 0 {
						mem.MultimodelHelper.PreGroupInfos[tableGroupID].ProjectExpr = expr
					}
				}
			}
			if functionExpr, ok := proj.Element.(*memo.FunctionExpr); ok {
				if functionExpr.FunctionPrivate.Name == "time_bucket" {
					if len(functionExpr.Args) > 0 {
						if variableExpr, ok := functionExpr.Args[0].(*memo.VariableExpr); ok {
							colID := variableExpr.Col
							tableID := mem.Metadata().ColumnMeta(colID).Table
							tableGroupID := mem.MultimodelHelper.GetTableIndexFromGroup(tableID)
							if tableGroupID >= 0 {
								aggColID := proj.Col
								if !containsColumn(mem.MultimodelHelper.PreGroupInfos[tableGroupID].GroupingCols, aggColID) {
									mem.MultimodelHelper.PreGroupInfos[tableGroupID].GroupingCols = append(
										mem.MultimodelHelper.PreGroupInfos[tableGroupID].GroupingCols, aggColID)
								}
								mem.MultimodelHelper.PreGroupInfos[tableGroupID].HasTimeBucket = true
								mem.MultimodelHelper.PreGroupInfos[tableGroupID].ProjectExpr = expr
							}
						}
					}
				}
			}
		}
	case *memo.SelectExpr:
		processRootExpr(expr.Input, mem)
	case *memo.DistinctOnExpr:
		processRootExpr(expr.Input, mem)
	case *memo.UpsertDistinctOnExpr:
		processRootExpr(expr.Input, mem)
	case *memo.LimitExpr:
		processRootExpr(expr.Input, mem)
	case *memo.OffsetExpr:
		processRootExpr(expr.Input, mem)
	case *memo.SortExpr:
		processRootExpr(expr.Input, mem)
	case *memo.IndexJoinExpr:
		processRootExpr(expr.Input, mem)
	case *memo.LookupJoinExpr:
		processRootExpr(expr.Input, mem)
	case *memo.OrdinalityExpr:
		processRootExpr(expr.Input, mem)
	case *memo.Max1RowExpr:
		processRootExpr(expr.Input, mem)
	case *memo.ProjectSetExpr:
		processRootExpr(expr.Input, mem)
	case *memo.WindowExpr:
		processRootExpr(expr.Input, mem)
	case *memo.InsertExpr:
		processRootExpr(expr.Input, mem)
	case *memo.UpdateExpr:
		processRootExpr(expr.Input, mem)
	case *memo.UpsertExpr:
		processRootExpr(expr.Input, mem)
	case *memo.DeleteExpr:
		processRootExpr(expr.Input, mem)
	case *memo.CreateTableExpr:
		processRootExpr(expr.Input, mem)
	case *memo.ExplainExpr:
		processRootExpr(expr.Input, mem)
	case *memo.AlterTableSplitExpr:
		processRootExpr(expr.Input, mem)
	case *memo.AlterTableUnsplitExpr:
		processRootExpr(expr.Input, mem)
	case *memo.AlterTableRelocateExpr:
		processRootExpr(expr.Input, mem)
	case *memo.ControlJobsExpr:
		processRootExpr(expr.Input, mem)
	case *memo.CancelQueriesExpr:
		processRootExpr(expr.Input, mem)
	case *memo.CancelSessionsExpr:
		processRootExpr(expr.Input, mem)
	case *memo.ExportExpr:
		processRootExpr(expr.Input, mem)
	case *memo.TSInsertSelectExpr:
		processRootExpr(expr.Input, mem)
	case *memo.GroupByExpr, *memo.ScalarGroupByExpr:
		processGroupByOrScalarGroupByExpr(expr, mem)
	case *memo.InnerJoinExpr:
		leftIsTSScan := isTSScanOrSelectTSScan(expr.Left)
		rightIsTSScan := isTSScanOrSelectTSScan(expr.Right)
		if leftIsTSScan && rightIsTSScan {
			mem.QueryType = memo.Unset
		}

		joinPredicates := expr.On

		for _, jp := range joinPredicates {
			if _, ok := jp.Condition.(*memo.OrExpr); ok {
				mem.QueryType = memo.Unset
				mem.MultimodelHelper.ResetReasons[memo.UnsupportedCrossJoin] = struct{}{}
			} else if mem.IsTsColsJoinPredicate(jp) {
				mem.QueryType = memo.Unset
			}
		}

		indexMapping := make(map[opt.TableID][][]opt.ColumnID)
		for _, table := range allTables {
			tableID := table.MetaID
			for i := 0; i < table.Table.IndexCount(); i++ {
				index := table.Table.Index(i)
				if index.IsUnique() {
					uint32IDs := index.IndexColumnIDs()
					columnIDs := make([]opt.ColumnID, len(uint32IDs))
					for j, id := range uint32IDs {
						columnIDs[j] = opt.ColumnID(id)
					}
					indexMapping[tableID] = append(indexMapping[tableID], columnIDs)
				}
			}
		}

		processJoinRelations(joinPredicates, expr.Left, expr.Right, mem)

		processRootExpr(expr.Left, mem)
		processRootExpr(expr.Right, mem)
	case *memo.SemiJoinExpr:
		leftIsTSScan := isTSScanOrSelectTSScan(expr.Left)
		rightIsTSScan := isTSScanOrSelectTSScan(expr.Right)
		if leftIsTSScan || rightIsTSScan {
			mem.QueryType = memo.Unset
			mem.MultimodelHelper.ResetReasons[memo.UnsupportedSemiJoin] = struct{}{}
		}
		processRootExpr(expr.Left, mem)
		processRootExpr(expr.Right, mem)
	case *memo.SemiJoinApplyExpr:
		leftIsTSScan := isTSScanOrSelectTSScan(expr.Left)
		rightIsTSScan := isTSScanOrSelectTSScan(expr.Right)
		if leftIsTSScan || rightIsTSScan {
			mem.QueryType = memo.Unset
			mem.MultimodelHelper.ResetReasons[memo.UnsupportedSemiJoin] = struct{}{}
		}
		processRootExpr(expr.Left, mem)
		processRootExpr(expr.Right, mem)
	case *memo.AntiJoinExpr:
		leftIsTSScan := isTSScanOrSelectTSScan(expr.Left)
		rightIsTSScan := isTSScanOrSelectTSScan(expr.Right)
		if leftIsTSScan || rightIsTSScan {
			mem.QueryType = memo.Unset
			mem.MultimodelHelper.ResetReasons[memo.UnsupportedSemiJoin] = struct{}{}
		}
		processRootExpr(expr.Left, mem)
		processRootExpr(expr.Right, mem)
	case *memo.AntiJoinApplyExpr:
		leftIsTSScan := isTSScanOrSelectTSScan(expr.Left)
		rightIsTSScan := isTSScanOrSelectTSScan(expr.Right)
		if leftIsTSScan || rightIsTSScan {
			mem.QueryType = memo.Unset
			mem.MultimodelHelper.ResetReasons[memo.UnsupportedSemiJoin] = struct{}{}
		}
		processRootExpr(expr.Left, mem)
		processRootExpr(expr.Right, mem)
	case *memo.LeftJoinApplyExpr:
		leftIsTSScan := isTSScanOrSelectTSScan(expr.Left)
		rightIsTSScan := isTSScanOrSelectTSScan(expr.Right)
		if leftIsTSScan || rightIsTSScan {
			mem.QueryType = memo.Unset
			mem.MultimodelHelper.ResetReasons[memo.UnsupportedOuterJoin] = struct{}{}
		}
		processRootExpr(expr.Left, mem)
		processRootExpr(expr.Right, mem)
	case *memo.UnionAllExpr:
		mem.MultimodelHelper.IsUnion = true
		processRootExpr(expr.Left, mem)
		processRootExpr(expr.Right, mem)
	case *memo.UnionExpr:
		mem.MultimodelHelper.IsUnion = true
		processRootExpr(expr.Left, mem)
		processRootExpr(expr.Right, mem)
	case *memo.WithExpr:
		processRootExpr(expr.Binding, mem)
		processRootExpr(expr.Main, mem)
	default:
	}
}

// processRemainingRelTables processes the remaining tables in the memo structure
// and adds them to the table group
func processRemainingRelTables(mem *memo.Memo) {
	metadata := mem.Metadata()
	if len(mem.MultimodelHelper.TableGroup) == 0 {
		var component []opt.TableID
		for _, table := range metadata.AllTables() {
			if metadata.TableMeta(table.MetaID).Table.GetTableType() != tree.TimeseriesTable {
				component = append(component, table.MetaID)
			} else {
				mem.MultimodelHelper.TableGroup = append(mem.MultimodelHelper.TableGroup, []opt.TableID{table.MetaID})
				mem.MultimodelHelper.PreGroupInfos = append(mem.MultimodelHelper.PreGroupInfos, memo.PreGroupInfo{})
				mem.MultimodelHelper.AggNotPushDown = append(mem.MultimodelHelper.AggNotPushDown, false)
				mem.MultimodelHelper.PlanMode = append(mem.MultimodelHelper.PlanMode, memo.Undefined)
			}
		}
		mem.MultimodelHelper.TableGroup[0] = append(mem.MultimodelHelper.TableGroup[0], component...)
	} else {
		for _, table := range metadata.AllTables() {
			tgIdx := mem.MultimodelHelper.GetTSTableIndexFromGroup(table.MetaID)
			if tgIdx == -1 && metadata.TableMeta(table.MetaID).Table.GetTableType() != tree.TimeseriesTable {
				mem.MultimodelHelper.TableGroup[0] = append(mem.MultimodelHelper.TableGroup[0], table.MetaID)
			}
		}
	}
}

func findTSTableID(expr memo.RelExpr, mem *memo.Memo) opt.TableID {
	for i := 0; i < expr.ChildCount(); i++ {
		if v, ok := expr.Child(i).(memo.RelExpr); ok {
			if tsScanExpr, ok := v.(*memo.TSScanExpr); ok {
				return tsScanExpr.TSScanPrivate.Table
			}
			if tableID := findTSTableID(v, mem); tableID != 0 {
				return tableID
			}
		}
	}
	return 0
}

// getColIDOfExpr returns the col ID of aggregation.
func getColIDOfExpr(input opt.Expr, pIdx int, origColID opt.ColumnID) opt.ColumnID {
	switch src := (input).(type) {
	case *memo.VariableExpr:
		return src.Col
	case *memo.AggDistinctExpr:
		return getColIDOfExpr(src.Input, 0, origColID)
	case *memo.CountExpr:
		return getColIDOfExpr(src.Input, 0, origColID)
	case *memo.ProjectExpr:
		if origColID != -1 && len(src.Projections) > pIdx &&
			src.Projections[pIdx].Col == origColID {
			return getColIDOfExpr(src.Projections[pIdx].Element, 0, origColID)
		}
	case *memo.CastExpr:
		return getColIDOfExpr(src.Input, 0, origColID)
	case *memo.CaseExpr:
		return getColIDOfExpr(src.Whens[0], 0, origColID)
	case *memo.WhenExpr:
		return getColIDOfExpr(src.Condition, 0, origColID)
	case *memo.GtExpr:
		colID := getColIDOfExpr(src.Left, 0, origColID)
		if colID == -1 {
			colID = getColIDOfExpr(src.Right, 0, origColID)
		}
		return colID
	case *memo.LtExpr:
		colID := getColIDOfExpr(src.Left, 0, origColID)
		if colID == -1 {
			colID = getColIDOfExpr(src.Right, 0, origColID)
		}
		return colID
	case *memo.GeExpr:
		colID := getColIDOfExpr(src.Left, 0, origColID)
		if colID == -1 {
			colID = getColIDOfExpr(src.Right, 0, origColID)
		}
		return colID
	case *memo.LeExpr:
		colID := getColIDOfExpr(src.Left, 0, origColID)
		if colID == -1 {
			colID = getColIDOfExpr(src.Right, 0, origColID)
		}
		return colID
	case *memo.EqExpr:
		colID := getColIDOfExpr(src.Left, 0, origColID)
		if colID == -1 {
			colID = getColIDOfExpr(src.Right, 0, origColID)
		}
		return colID
	case *memo.NeExpr:
		colID := getColIDOfExpr(src.Left, 0, origColID)
		if colID == -1 {
			colID = getColIDOfExpr(src.Right, 0, origColID)
		}
		return colID
	case *memo.DivExpr:
		colID := getColIDOfExpr(src.Left, 0, origColID)
		if colID == -1 {
			colID = getColIDOfExpr(src.Right, 0, origColID)
		}
		return colID
	case *memo.AndExpr:
		colID := getColIDOfExpr(src.Left, 0, origColID)
		if colID == -1 {
			colID = getColIDOfExpr(src.Right, 0, origColID)
		}
		return colID
	case *memo.OrExpr:
		colID := getColIDOfExpr(src.Left, 0, origColID)
		if colID == -1 {
			colID = getColIDOfExpr(src.Right, 0, origColID)
		}
		return colID
	case *memo.MultExpr:
		colID := getColIDOfExpr(src.Left, 0, origColID)
		if colID == -1 {
			colID = getColIDOfExpr(src.Right, 0, origColID)
		}
		return colID
	case *memo.PlusExpr:
		colID := getColIDOfExpr(src.Left, 0, origColID)
		if colID == -1 {
			colID = getColIDOfExpr(src.Right, 0, origColID)
		}
		return colID
	case *memo.MinusExpr:
		colID := getColIDOfExpr(src.Left, 0, origColID)
		if colID == -1 {
			colID = getColIDOfExpr(src.Right, 0, origColID)
		}
		return colID
	case *memo.FunctionExpr:
		for _, arg := range src.Args {
			colID := getColIDOfExpr(arg, 0, origColID)
			if colID != -1 {
				return colID
			}
		}
	default:
	}
	return -1
}

func getColIDsOfExpr(input opt.Expr, pIdx int, origColID opt.ColumnID) []opt.ColumnID {
	var colIDs []opt.ColumnID

	switch src := input.(type) {
	case *memo.VariableExpr:
		colIDs = append(colIDs, src.Col)

	case *memo.AggDistinctExpr, *memo.CountExpr, *memo.CastExpr:
		colIDs = append(colIDs, getColIDsOfExpr(src.Child(0), 0, origColID)...)

	case *memo.ProjectExpr:
		if origColID != -1 && len(src.Projections) > pIdx &&
			src.Projections[pIdx].Col == origColID {
			colIDs = append(colIDs, getColIDsOfExpr(src.Projections[pIdx].Element, 0, origColID)...)
		}

	case *memo.CaseExpr:
		colIDs = append(colIDs, getColIDsOfExpr(src.Whens[0], 0, origColID)...)

	case *memo.WhenExpr:
		colIDs = append(colIDs, getColIDsOfExpr(src.Condition, 0, origColID)...)

	case *memo.GtExpr, *memo.LtExpr, *memo.GeExpr, *memo.LeExpr, *memo.EqExpr, *memo.PlusExpr, *memo.MinusExpr,
		*memo.NeExpr, *memo.DivExpr, *memo.AndExpr, *memo.OrExpr, *memo.MultExpr:
		colIDs = append(colIDs, getColIDsOfExpr(src.Child(0), 0, origColID)...)
		colIDs = append(colIDs, getColIDsOfExpr(src.Child(1), 0, origColID)...)

	case *memo.FunctionExpr:
		for _, arg := range src.Args {
			colIDs = append(colIDs, getColIDsOfExpr(arg, 0, origColID)...)
		}
	}

	return removeDuplicateColIDs(colIDs)
}

func removeDuplicateColIDs(colIDs []opt.ColumnID) []opt.ColumnID {
	colIDMap := make(map[opt.ColumnID]struct{})
	var uniqueColIDs []opt.ColumnID

	for _, colID := range colIDs {
		if colID != -1 {
			if _, exists := colIDMap[colID]; !exists {
				colIDMap[colID] = struct{}{}
				uniqueColIDs = append(uniqueColIDs, colID)
			}
		}
	}
	return uniqueColIDs
}

// processGroupByOrScalarGroupByExpr analyzes a GroupByExpr or ScalarGroupByExpr
// to determine whether multi-model optimization is supported, whether aggregation
// can be pushed down to the TS engine, and to record pre-grouping information.
func processGroupByOrScalarGroupByExpr(expr memo.RelExpr, mem *memo.Memo) {
	processRootExpr(expr.Child(0).(memo.RelExpr), mem)
	processRemainingRelTables(mem)
	unsupportedMultiModel := false
	metaData := mem.Metadata()
	if projectExpr, ok := expr.Child(0).(*memo.ProjectExpr); ok {
		for _, proj := range projectExpr.Projections {
			if element, ok := proj.Element.(opt.Expr); ok {
				if execInTSEngine, _ := memo.CheckExprCanExecInTSEngine(element, memo.ExprPosProjList,
					mem.GetWhiteList().CheckWhiteListParam, false, mem.CheckOnlyOnePTagValue()); !execInTSEngine {
					// this path used to fallback because of memo.UnsupportedAggFuncOrExpr in outside-in plan,
					// but now outside-in supports this path and leave unsupported agg/expr to relational side
					colIDs := getColIDsOfExpr(element, 0, -1)
					tableIDs := make(map[opt.TableID]struct{})

					// if we can't find the colID or table group id, we can't determine the plan mode
					hasInvalidColID := false
					hasInvalidTgIdx := false

					for _, colID := range colIDs {
						if colID == -1 {
							hasInvalidColID = true
						}
						if colID != -1 {
							tableID := metaData.ColumnMeta(colID).Table
							tgIdx := mem.MultimodelHelper.GetTSTableIndexFromGroup(tableID)
							tableIDs[tableID] = struct{}{}
							if tgIdx == -1 {
								hasInvalidTgIdx = true
							}
						}
					}

					if hasInvalidColID || hasInvalidTgIdx {
						// if there is only one table group, we can still safely set its flag
						if len(mem.MultimodelHelper.TableGroup) == 1 {
							mem.MultimodelHelper.AggNotPushDown[0] = true
						} else {
							// otherwise, we need to set the flag to unsupported and fallback
							unsupportedMultiModel = true
							mem.MultimodelHelper.ResetReasons[memo.UnsupportedAggFuncOrExpr] = struct{}{}
						}
					} else {
						// only if the table is a timeseries table, we can't push down the aggregation,
						// otherwise we can consider both inside-out and outside-in
						for tableID := range tableIDs {
							if metaData.TableMeta(tableID).Table.GetTableType() == tree.TimeseriesTable {
								tgIdx := mem.MultimodelHelper.GetTSTableIndexFromGroup(tableID)
								if tgIdx != -1 {
									mem.MultimodelHelper.AggNotPushDown[tgIdx] = true
								}
							}
						}
					}
				}
			}
		}
	}

	// check data types of grouping cols
	groupingCols := expr.Private().(*memo.GroupingPrivate).GroupingCols
	groupingCols.ForEach(func(col opt.ColumnID) {
		if !memo.CheckDataType(metaData.ColumnMeta(col).Type) ||
			!memo.CheckDataLength(metaData.ColumnMeta(col).Type) {
			tableID := metaData.ColumnMeta(col).Table
			if tableID == 0 {
				if projectExpr, ok := expr.Child(0).(*memo.ProjectExpr); ok {
					for _, proj := range projectExpr.Projections {
						if proj.Col == col {
							if element, ok := proj.Element.(opt.Expr); ok {
								colID := getColIDOfExpr(element, 0, col)
								if colID != -1 {
									tableID = metaData.ColumnMeta(colID).Table
								}
							}
						}
					}
				}
			}
			tgIdx := mem.MultimodelHelper.GetTSTableIndexFromGroup(tableID)
			if tgIdx >= 0 && !mem.MultimodelHelper.AggNotPushDown[tgIdx] {
				if mem.MultimodelHelper.PlanMode[tgIdx] == memo.OutsideIn {
					mem.QueryType = memo.Unset
					mem.MultimodelHelper.ResetReasons[memo.UnsupportedPlanMode] = struct{}{}
				} else {
					mem.MultimodelHelper.PlanMode[tgIdx] = memo.InsideOut
				}
			} else {
				unsupportedMultiModel = true
				mem.MultimodelHelper.ResetReasons[memo.UnsupportedDataType] = struct{}{}
			}
		}
	})

	handleGroupByOrdering := false
	hasCountRow := false
	countRowTableIdx := -1
	skipProj := 0

	var avgAggs []struct {
		FuncID opt.ColumnID
		ColID  opt.ColumnID
	}
	countAggs := make(map[opt.ColumnID]opt.ColumnID)
	sumAggs := make(map[opt.ColumnID]opt.ColumnID)

	// check data types of aggregation columns
	aggregations := *expr.Child(1).(*memo.AggregationsExpr)
	for pIdx, agg := range aggregations {
		var aggColID opt.ColumnID
		var aggColTableID opt.TableID

		aggFuncID := agg.Col
		noPreGrouping := false
		// skip the pregrouping analysis for count(*) expression
		if agg.Agg.ChildCount() == 0 {
			if _, ok := agg.Agg.(*memo.CountRowsExpr); ok {
				aggColTableID = findTSTableID(expr, mem)
				hasCountRow = true
			}
		} else {
			aggColID = getColIDOfExpr(agg.Agg.Child(0), 0, -1)
			aggColTableID = mem.Metadata().ColumnMeta(aggColID).Table
		}

		if aggColTableID == 0 {
			noPreGrouping = true
			aggColID = getColIDOfExpr(expr.Child(0), pIdx-skipProj, aggColID)
			if aggColID != -1 {
				aggColTableID = mem.Metadata().ColumnMeta(aggColID).Table
			} else {
				skipProj++
			}
		} else {
			skipProj++
		}
		aggType := reflect.TypeOf(agg.Agg).Elem().Name()
		preGroupInfos := mem.MultimodelHelper.PreGroupInfos
		if len(preGroupInfos) < len(mem.MultimodelHelper.TableGroup) {
			for len(preGroupInfos) < len(mem.MultimodelHelper.TableGroup) {
				preGroupInfos = append(preGroupInfos, memo.PreGroupInfo{})
			}
		}

		groupIndex := mem.MultimodelHelper.GetTSTableIndexFromGroup(aggColTableID)
		if hasCountRow && countRowTableIdx == -1 {
			countRowTableIdx = groupIndex
		}

		if noPreGrouping {
			if aggColTableID == 0 {
				continue
			}
			if metaData.TableMeta(aggColTableID).Table.GetTableType() == tree.TimeseriesTable {
				mem.MultimodelHelper.AggNotPushDown[groupIndex] = true
				continue
			}
		}

		for i := 0; i < agg.Child(0).ChildCount(); i++ {
			if scalarExpr, ok := agg.Child(0).Child(i).(opt.ScalarExpr); ok {
				if !memo.CheckDataType(scalarExpr.DataType()) {
					if groupIndex >= 0 && !mem.MultimodelHelper.AggNotPushDown[groupIndex] {
						if mem.MultimodelHelper.PlanMode[groupIndex] == memo.OutsideIn {
							mem.QueryType = memo.Unset
							mem.MultimodelHelper.ResetReasons[memo.UnsupportedDataType] = struct{}{}
						} else {
							mem.MultimodelHelper.PlanMode[groupIndex] = memo.InsideOut
						}
					} else {
						unsupportedMultiModel = true
						mem.MultimodelHelper.ResetReasons[memo.UnsupportedDataType] = struct{}{}
					}
				}
			}
		}

		if metaData.TableMeta(aggColTableID).Table.GetTableType() != tree.TimeseriesTable {
			continue
		}

		if groupIndex != -1 {
			preGroupInfos[groupIndex].AggColumns = append(preGroupInfos[groupIndex].AggColumns, aggColID)
			preGroupInfos[groupIndex].AggFuncs = append(preGroupInfos[groupIndex].AggFuncs, aggType)
			switch aggType {
			case "CountExpr", "CountRowsExpr":
				// preGroupInfos[groupIndex].NewAggColumns = append(preGroupInfos[groupIndex].NewAggColumns, aggFuncID)
				preGroupInfos[groupIndex].OtherFuncColumns = append(preGroupInfos[groupIndex].OtherFuncColumns, aggFuncID)
				countAggs[aggColID] = aggFuncID
			case "AvgExpr":
				preGroupInfos[groupIndex].AvgFuncColumns = append(preGroupInfos[groupIndex].AvgFuncColumns, aggFuncID)
				avgAggs = append(avgAggs, struct {
					FuncID opt.ColumnID
					ColID  opt.ColumnID
				}{
					FuncID: aggFuncID,
					ColID:  aggColID,
				})
			case "SumExpr":
				preGroupInfos[groupIndex].OtherFuncColumns = append(preGroupInfos[groupIndex].OtherFuncColumns, aggFuncID)
				sumAggs[aggColID] = aggFuncID
			case "MaxExpr", "MinExpr":
				preGroupInfos[groupIndex].OtherFuncColumns = append(preGroupInfos[groupIndex].OtherFuncColumns, aggFuncID)
			case "LastExpr", "FirstExpr", "LastRowExpr", "FirstRowExpr":
				preGroupInfos[groupIndex].OtherFuncColumns = append(preGroupInfos[groupIndex].OtherFuncColumns, aggFuncID)
				mem.MultimodelHelper.HasLastAgg = true
				if mem.MultimodelHelper.PlanMode[groupIndex] == memo.InsideOut {
					mem.QueryType = memo.Unset
					mem.MultimodelHelper.ResetReasons[memo.UnsupportedPlanMode] = struct{}{}
				} else {
					mem.MultimodelHelper.PlanMode[groupIndex] = memo.OutsideIn
				}
			case "AggDistinctExpr":
				if mem.MultimodelHelper.PlanMode[groupIndex] != memo.InsideOut {
					mem.MultimodelHelper.PlanMode[groupIndex] = memo.OutsideIn
				} else {
					mem.MultimodelHelper.PlanMode[groupIndex] = memo.Undefined
					mem.MultimodelHelper.AggNotPushDown[groupIndex] = true
				}
			default:
				mem.MultimodelHelper.AggNotPushDown[groupIndex] = true
			}
			preGroupInfos[groupIndex].GroupByExprPos = append(preGroupInfos[groupIndex].GroupByExprPos, pIdx)
			preGroupInfos[groupIndex].BuildingPreGrouping = true

			var mappings1 []memo.AvgToCountOrSumMapping
			for _, avg := range avgAggs {
				countFuncID := opt.ColumnID(0)
				if matchedFuncID, exists := countAggs[avg.ColID]; exists {
					countFuncID = matchedFuncID
				}

				mappings1 = append(mappings1, memo.AvgToCountOrSumMapping{
					AvgFuncID: avg.FuncID,
					FuncID:    countFuncID,
				})
			}
			preGroupInfos[groupIndex].AvgToCountMapping = mappings1

			var mappings2 []memo.AvgToCountOrSumMapping
			for _, avg := range avgAggs {
				sumFuncID := opt.ColumnID(0)
				if matchedFuncID, exists := sumAggs[avg.ColID]; exists {
					sumFuncID = matchedFuncID
				}

				mappings2 = append(mappings2, memo.AvgToCountOrSumMapping{
					AvgFuncID: avg.FuncID,
					FuncID:    sumFuncID,
				})
			}
			preGroupInfos[groupIndex].AvgToSumMapping = mappings2

			if !handleGroupByOrdering {
				groupByInput := expr.Child(0).(memo.RelExpr)
				for _, c := range groupByInput.ProvidedPhysical().Ordering {
					found := false
					for _, col := range preGroupInfos[groupIndex].GroupingCols {
						if col == c.ID() {
							found = true
							break
						}
					}
					if !found {
						if !containsColumn(preGroupInfos[groupIndex].GroupingCols, aggColID) {
							preGroupInfos[groupIndex].GroupingCols = append(preGroupInfos[groupIndex].GroupingCols, c.ID())
						}
					}
				}
				handleGroupByOrdering = true
			}
		}

		mem.MultimodelHelper.PreGroupInfos = preGroupInfos

		srcExpr := agg.Agg
		hashCode := memo.GetExprHash(srcExpr)
		if groupIndex >= 0 && !mem.GetWhiteList().CheckWhiteListParam(hashCode, memo.ExprPosProjList) {
			mem.MultimodelHelper.AggNotPushDown[groupIndex] = true
		}
	}

	// if the agg in the multi-model query is countrows only so far, then don't choose inside-out plan
	// to guarantee the correct result as it is under join context
	if hasCountRow && countRowTableIdx != -1 && len(mem.MultimodelHelper.PreGroupInfos[countRowTableIdx].AggFuncs) == 1 {
		if mem.MultimodelHelper.PlanMode[countRowTableIdx] == memo.InsideOut {
			mem.QueryType = memo.Unset
			mem.MultimodelHelper.ResetReasons[memo.UnsupportedPlanMode] = struct{}{}
		} else {
			mem.MultimodelHelper.PlanMode[countRowTableIdx] = memo.OutsideIn
		}
	}

	// handle reduced grouping columns
	groupingCols.ForEach(func(col opt.ColumnID) {
		tgIdx := mem.MultimodelHelper.GetTableIndexFromGroup(metaData.ColumnMeta(col).Table)
		if tgIdx >= 0 {
			if !containsColumn(mem.MultimodelHelper.PreGroupInfos[tgIdx].GroupingCols, col) {
				mem.MultimodelHelper.PreGroupInfos[tgIdx].GroupingCols = append(mem.MultimodelHelper.PreGroupInfos[tgIdx].GroupingCols, col)
			}
		}
	})

	if unsupportedMultiModel {
		mem.QueryType = memo.Unset
	}
}

// processJoinRelations constructs TableGroup, a two-dimensional array that
// organizes time-series and relational tables based on their join relationships.
//
//   - Each subarray in TableGroup represents a group of related tables.
//   - The first element in each subarray is a timeseries table's TableID.
//   - The subsequent elements are relational tables that have a direct join
//     relationship with the timeseries table.
//   - If a table does not directly join a timeseries table but joins another
//     relational table, it is added to the same subarray as that relational table.
//
// **Limitations:**
//   - Cross joins are not supported, and if no no valid tables are found to
//     construct TableGroup, the plan will fall back
//   - Only equality conditions (EqExpr) are considered for table grouping;
//   - TableIDs must be resolvable from ColumnIDs; virtual or derived columns
//     without a clear TableID will not be included in TableGroup.
func processJoinRelations(
	joinPredicates memo.FiltersExpr, left memo.RelExpr, right memo.RelExpr, mem *memo.Memo,
) {
	tableGroup := mem.MultimodelHelper.TableGroup
	pendingRelations := make(map[opt.TableID][]opt.TableID)

	if joinPredicates.ChildCount() == 0 {
		if len(mem.MultimodelHelper.TableGroup) == 0 {
			mem.QueryType = memo.Unset
			mem.MultimodelHelper.ResetReasons[memo.UnsupportedCrossJoin] = struct{}{}
		} else {
			switch lc := left.(type) {
			case *memo.ScanExpr:
				if mem.MultimodelHelper.GetTSTableIndexFromGroup(lc.ScanPrivate.Table) == -1 {
					mem.MultimodelHelper.TableGroup[0] = append(mem.MultimodelHelper.TableGroup[0], lc.ScanPrivate.Table)
				}
			default:
			}
			switch rc := right.(type) {
			case *memo.ScanExpr:
				if mem.MultimodelHelper.GetTSTableIndexFromGroup(rc.ScanPrivate.Table) == -1 {
					mem.MultimodelHelper.TableGroup[0] = append(mem.MultimodelHelper.TableGroup[0], rc.ScanPrivate.Table)
				}
			default:
			}
		}
		return
	}

	for _, jp := range joinPredicates {
		if eqExpr, ok := jp.Condition.(*memo.EqExpr); ok {
			var leftTableID, rightTableID opt.TableID
			var leftColID, rightColID opt.ColumnID
			var leftIsTimeSeries, rightIsTimeSeries bool

			if leftVar, ok := eqExpr.Left.(*memo.VariableExpr); ok {
				leftColID = leftVar.Col
				leftTableID = mem.Metadata().ColumnMeta(leftColID).Table

				// handle virtual column
				if leftTableID == 0 {
					leftColID = getColIDOfExpr(left, 0, leftColID)
					if leftColID >= 0 {
						leftTableID = mem.Metadata().ColumnMeta(leftColID).Table
					}
				}
				if leftTableID != 0 {
					leftTable := mem.Metadata().Table(leftTableID)
					leftIsTimeSeries = leftTable.GetTableType() == tree.TimeseriesTable
				}
			}

			if rightVar, ok := eqExpr.Right.(*memo.VariableExpr); ok {
				rightColID = rightVar.Col
				rightTableID = mem.Metadata().ColumnMeta(rightColID).Table

				// handle virtual column
				if rightTableID == 0 {
					rightColID = getColIDOfExpr(right, 0, rightColID)
					if rightColID >= 0 {
						rightTableID = mem.Metadata().ColumnMeta(rightColID).Table
					}
				}
				if rightTableID != 0 {
					rightTable := mem.Metadata().Table(rightTableID)
					rightIsTimeSeries = rightTable.GetTableType() == tree.TimeseriesTable
				}
			}

			if leftIsTimeSeries || rightIsTimeSeries {
				tsTableID := leftTableID
				relTableID := rightTableID
				groupingColID := leftColID
				if !leftIsTimeSeries {
					tsTableID, relTableID = rightTableID, leftTableID
					groupingColID = rightColID
				}

				found := false
				var index int
				for i, component := range tableGroup {
					if component[0] == tsTableID {
						if !contains(component, relTableID) && relTableID != 0 {
							tableGroup[i] = append(component, relTableID)
						}
						index = i
						found = true
						break
					}
				}
				if !found && tsTableID != 0 {
					index = len(tableGroup)
					if relTableID != 0 {
						tableGroup = append(tableGroup, []opt.TableID{tsTableID, relTableID})
					} else {
						tableGroup = append(tableGroup, []opt.TableID{tsTableID})
					}
				}

				if leftIsTimeSeries {
					if rightColID != -1 && (!memo.CheckDataType(mem.Metadata().ColumnMeta(rightColID).Type) ||
						!memo.CheckDataLength(mem.Metadata().ColumnMeta(rightColID).Type)) {
						if mem.MultimodelHelper.PlanMode[index] == memo.OutsideIn {
							mem.QueryType = memo.Unset
							mem.MultimodelHelper.ResetReasons[memo.UnsupportedDataType] = struct{}{}
						} else {
							mem.MultimodelHelper.PlanMode[index] = memo.InsideOut
						}
					}
				} else {
					if leftColID != -1 && (!memo.CheckDataType(mem.Metadata().ColumnMeta(leftColID).Type) ||
						!memo.CheckDataLength(mem.Metadata().ColumnMeta(leftColID).Type)) {
						if mem.MultimodelHelper.PlanMode[index] == memo.OutsideIn {
							mem.QueryType = memo.Unset
							mem.MultimodelHelper.ResetReasons[memo.UnsupportedDataType] = struct{}{}
						} else {
							mem.MultimodelHelper.PlanMode[index] = memo.InsideOut
						}
					}
				}

				for len(mem.MultimodelHelper.PreGroupInfos) <= index {
					mem.MultimodelHelper.PreGroupInfos = append(mem.MultimodelHelper.PreGroupInfos, memo.PreGroupInfo{})
					mem.MultimodelHelper.AggNotPushDown = append(mem.MultimodelHelper.AggNotPushDown, false)
					mem.MultimodelHelper.PlanMode = append(mem.MultimodelHelper.PlanMode, memo.Undefined)
				}
				if !containsColumn(mem.MultimodelHelper.PreGroupInfos[index].GroupingCols, groupingColID) {
					mem.MultimodelHelper.PreGroupInfos[index].GroupingCols = append(mem.MultimodelHelper.PreGroupInfos[index].GroupingCols, groupingColID)
				}
			} else {
				pendingRelations[leftTableID] = append(pendingRelations[leftTableID], rightTableID)
				pendingRelations[rightTableID] = append(pendingRelations[rightTableID], leftTableID)
			}
		}
	}

	if len(pendingRelations) > 0 {
		processed := make(map[opt.TableID]bool)

		for tableID, connectedTables := range pendingRelations {
			alreadyAssigned := false
			for _, component := range tableGroup {
				if contains(component, tableID) {
					alreadyAssigned = true
					break
				}
			}
			if alreadyAssigned {
				processed[tableID] = true
				continue
			}

			assigned := false
			for i, component := range tableGroup {
				for _, tID := range component {
					if contains(connectedTables, tID) {
						tableGroup[i] = append(component, tableID)
						assigned = true
						break
					}
				}
				if assigned {
					break
				}
			}

			if assigned {
				processed[tableID] = true
			}
		}
	}

	mem.MultimodelHelper.TableGroup = tableGroup
}

func contains(list []opt.TableID, target opt.TableID) bool {
	for _, item := range list {
		if item == target {
			return true
		}
	}
	return false
}

func containsColumn(list []opt.ColumnID, target opt.ColumnID) bool {
	for _, item := range list {
		if item == target {
			return true
		}
	}
	return false
}

func checkIfJoinColsContainUniqueIndex(
	tableID opt.TableID,
	joinColIDs []opt.ColumnID,
	indexMapping map[opt.TableID][][]opt.ColumnID,
	mem *memo.Memo,
) bool {
	uniqueIndexes, exists := indexMapping[tableID]
	if !exists {
		return false
	}

	joinColSet := make(map[opt.ColumnID]bool)
	for _, col := range joinColIDs {
		adjustCol := mem.Metadata().GetTagIDByColumnID(col) + 1
		joinColSet[opt.ColumnID(adjustCol)] = true
	}

	for _, uniqueIndexCols := range uniqueIndexes {
		if isSubset(uniqueIndexCols, joinColSet) {
			return true
		}
	}

	return false
}

func isSubset(indexCols []opt.ColumnID, joinColSet map[opt.ColumnID]bool) bool {
	for _, col := range indexCols {
		if !joinColSet[col] {
			return false
		}
	}
	return true
}

// JoinOrderBuilder returns the JoinOrderBuilder instance that the optimizer is
// currently using to reorder join trees.
func (o *Optimizer) JoinOrderBuilder() *JoinOrderBuilder {
	return o.jb
}
